{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Copyright (c) Microsoft Corporation. All rights reserved.  \n",
        "Licensed under the MIT License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-setup-versioned-pipeline-endpoints.png)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "\n",
        "# How to Setup a PipelineEndpoint and Submit a Pipeline Using the PipelineEndpoint.\n",
        "In this notebook, we will see how to setup a PipelineEndpoint and run a specific pipeline version.\n",
        "\n",
        "PipelineEndpoint can be used to update a published pipeline while maintaining the same endpoint.\n",
        "PipelineEndpoint provides a way to keep track of [PublishedPipelines](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.publishedpipeline) using versions. PipelineEndpoint uses endpoint with version information to trigger an underlying published pipeline. Pipeline endpoints are uniquely named within a workspace.  \n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Prerequisites and AML Basics\n",
        "If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the [configuration Notebook](https://aka.ms/pl-config) first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc.\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core import Workspace\n",
        "\n",
        "ws = Workspace.from_config()\n",
        "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Notebook Overview\n",
        "In this notebook, we provide an introduction to Azure machine learning PipelineEndpoints. It covers:\n",
        "* [Create PipelineEndpoint](#Create-PipelineEndpoint), How to create PipelineEndpoint.\n",
        "* [Retrieving PipelineEndpoint](#Retrieving-PipelineEndpoint), How to get specific PipelineEndpoint from worskpace by name/Id and get all [PipelineEndpoints](#Get-all-PipelineEndpoints-in-workspace) within workspace.\n",
        "* [PipelineEndpoint Properties](#PipelineEndpoint-properties). How to get and set PipelineEndpoint properties, such as default version of PipelineEndpoint.\n",
        "* [PipelineEndpoint Submission](#PipelineEndpoint-Submission). How to run a Pipeline using PipelineEndpoint."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "###  Create PipelineEndpoint\n",
        "Following are required input parameters to create PipelineEndpoint:\n",
        "\n",
        "* *workspace*: AML workspace.\n",
        "* *name*: name of PipelineEndpoint, it is unique within workspace.\n",
        "* *description*: description details for PipelineEndpoint.\n",
        "* *pipeline*: A [Pipeline](#Steps-to-create-simple-Pipeline) or [PublishedPipeline](#Publish-Pipeline), to set default version of PipelineEndpoint.                                                       "
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "####  Initialization, Steps to create a Pipeline\n",
        "\n",
        "The best practice is to use separate folders for scripts and its dependent files for each step and specify that folder as the `source_directory` for the step. This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted). Since changes in any files in the `source_directory` would trigger a re-upload of the snapshot, this helps keep the reuse of the step when there are no changes in the `source_directory` of the step.\n",
        "\n",
        "> Note that if you have an AzureML Data Scientist role, you will not have permission to create compute resources. Talk to your workspace or IT admin to create the compute targets described in this section, if they do not already exist."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.compute import AmlCompute, ComputeTarget\n",
        "from azureml.pipeline.steps import PythonScriptStep\n",
        "from azureml.pipeline.core import Pipeline\n",
        "\n",
        "#Retrieve an already attached Azure Machine Learning Compute\n",
        "from azureml.core.compute_target import ComputeTargetException\n",
        "aml_compute_target = \"cpu-cluster\"\n",
        "try:\n",
        "    aml_compute = AmlCompute(ws, aml_compute_target)\n",
        "    print(\"Found existing compute target: {}\".format(aml_compute_target))\n",
        "except ComputeTargetException:\n",
        "    print(\"Creating new compute target: {}\".format(aml_compute_target))\n",
        "    \n",
        "    provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_V2\",\n",
        "                                                                min_nodes = 1, \n",
        "                                                                max_nodes = 4)    \n",
        "    aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)\n",
        "    aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n",
        "\n",
        "# source_directory\n",
        "source_directory = 'publish_run_train'\n",
        "# define a single step pipeline for demonstration purpose.\n",
        "trainStep = PythonScriptStep(\n",
        "    name=\"Training_Step\",\n",
        "    script_name=\"train.py\", \n",
        "    compute_target=aml_compute_target, \n",
        "    source_directory=source_directory\n",
        ")\n",
        "print(\"TrainStep created\")\n",
        "# build and validate Pipeline\n",
        "pipeline = Pipeline(workspace=ws, steps=[trainStep])\n",
        "print(\"Pipeline is built\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Publish Pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from datetime import datetime\n",
        "\n",
        "timenow = datetime.now().strftime('%m-%d-%Y-%H-%M')\n",
        "\n",
        "pipeline_name = timenow + \"-Pipeline\"\n",
        "print(pipeline_name)\n",
        "\n",
        "published_pipeline = pipeline.publish(\n",
        "    name=pipeline_name, \n",
        "    description=pipeline_name)\n",
        "print(\"Newly published pipeline id: {}\".format(published_pipeline.id))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Publishing PipelineEndpoint\n",
        "Create PipelineEndpoint with required parameters: workspace, name, description and pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core import PipelineEndpoint\n",
        "\n",
        "pipeline_endpoint = PipelineEndpoint.publish(workspace=ws, name=\"PipelineEndpointTest\",\n",
        "                                            pipeline=pipeline, description=\"Test description Notebook\")\n",
        "pipeline_endpoint"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Retrieving PipelineEndpoint\n",
        "\n",
        "PipelineEndpoint is uniquely defined by name and id within workspace. PipelineEndpoint in workspace can be retrived by Id or by name."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Get PipelineEndpoint by Name\n",
        "\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name=\"PipelineEndpointTest\")\n",
        "pipeline_endpoint_by_name"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Get PipelineEndpoint by Id\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "#get the PipelineEndpoint Id\n",
        "pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name=\"PipelineEndpointTest\")\n",
        "endpoint_id = pipeline_endpoint_by_name.id\n",
        "\n",
        "pipeline_endpoint_by_id = PipelineEndpoint.get(workspace=ws, id=endpoint_id)\n",
        "pipeline_endpoint_by_id"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Get all PipelineEndpoints in workspace\n",
        "Returns all PipelineEndpoints within workspace"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "endpoint_list = PipelineEndpoint.list(workspace=ws, active_only=True)\n",
        "endpoint_list"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### PipelineEndpoint properties"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Default Version of PipelineEndpoint\n",
        "Default version of PipelineEndpoint starts from \"0\" and increments on addition of pipelines.\n",
        "\n",
        "##### Get the Default Version"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "default_version = pipeline_endpoint_by_name.get_default_version()\n",
        "default_version"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#####  Set default version \n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_endpoint_by_name.set_default_version(\"0\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Get the Published Pipeline corresponds to specific version of PipelineEndpoint"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline = pipeline_endpoint_by_name.get_pipeline(\"0\")\n",
        "pipeline"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Get default version Published Pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline = pipeline_endpoint_by_name.get_pipeline()\n",
        "pipeline"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Add Published Pipeline to PipelineEndpoint, \n",
        "Adds a published pipeline (if its not present) using add() and if you want to add and set to default use add_default()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_endpoint_by_name.add(published_pipeline)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Add Published pipeline to PipelineEndpoint and set it to default version\n",
        "Adding published pipeline to PipelineEndpoint if not present and set it to default"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Set Published Pipeline to PipelineEndpoint, if exists\n",
        "pipeline_endpoint_by_name.set_default(published_pipeline)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Get all Versions in PipelineEndpoint\n",
        "Returns list of published pipelines and its versions"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "versions = pipeline_endpoint_by_name.list_versions()\n",
        "\n",
        "for ve in versions:\n",
        "    print(ve.version)\n",
        "    print(ve.pipeline.id)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Get all Published Pipelines in PipelineEndpoint\n",
        "Returns all active pipelines in PipelineEnpoint, if active_only flag is set to True."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipelines = pipeline_endpoint_by_name.list_pipelines(active_only=True)\n",
        "pipelines"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Name property of PipelineEndpoint\n",
        "PipelineEndpoint is uniquely identified by name"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "##### Set Name PipelineEndpoint"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_endpoint_by_name.set_name(name=\"NewName\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### PipelineEndpoint Submission\n",
        "PipelineEndpoint triggers specific versioned pipeline or default pipeline by:\n",
        "* Rest Endpoint \n",
        "* Submit call."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Run Pipeline by endpoint property of PipelineEndpoint\n",
        "Run specific pipeline using endpoint property of PipelineEndpoint and executing http post."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name=\"NewName\")\n",
        "\n",
        "# endpoint with id \n",
        "rest_endpoint_id =  pipeline_endpoint_by_name.endpoint\n",
        "\n",
        "# for default version pipeline\n",
        "rest_endpoint_id_without_version_with_id = rest_endpoint_id\n",
        "\n",
        "# for specific version pipeline just append version info\n",
        "version=\"0\"\n",
        "rest_endpoint_id_with_version = rest_endpoint_id_without_version_with_id+\"/\"+ version\n",
        "print(rest_endpoint_id_with_version)\n",
        "pipeline_endpoint_by_name"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# endpoint with name\n",
        "rest_endpoint_name = rest_endpoint_id.split(\"Id\", 1)[0] + \"Name?name=\" + pipeline_endpoint_by_name.name\n",
        "\n",
        "# for default version pipeline\n",
        "rest_endpoint_name_without_version = rest_endpoint_name\n",
        "\n",
        "# for specific version pipeline just append version info\n",
        "version=\"0\"\n",
        "rest_endpoint_name_with_version = rest_endpoint_name_without_version+\"&pipelineVersion=\"+ version\n",
        "print(rest_endpoint_name_with_version)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "[This notebook](https://aka.ms/pl-restep-auth) shows how to authenticate to AML workspace."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.authentication import InteractiveLoginAuthentication\n",
        "import requests\n",
        "\n",
        "auth = InteractiveLoginAuthentication()\n",
        "aad_token = auth.get_authentication_header()\n",
        "\n",
        "#endpoint = pipeline_endpoint_by_name.url\n",
        "\n",
        "print(\"You can perform HTTP POST on URL {} to trigger this pipeline\".format(rest_endpoint_name_with_version))\n",
        "\n",
        "# specify the param when running the pipeline\n",
        "response = requests.post(rest_endpoint_name_with_version, \n",
        "                         headers=aad_token, \n",
        "                         json={\"ExperimentName\": \"default_pipeline\",\n",
        "                               \"RunSource\": \"SDK\",\n",
        "                               \"ParameterAssignments\": {\"1\": \"united\", \"2\":\"city\"}})"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "try:\n",
        "    response.raise_for_status()\n",
        "except Exception:    \n",
        "    raise Exception('Received bad response from the endpoint: {}\\n'\n",
        "                    'Response Code: {}\\n'\n",
        "                    'Headers: {}\\n'\n",
        "                    'Content: {}'.format(rest_endpoint, response.status_code, response.headers, response.content))\n",
        "\n",
        "run_id = response.json().get('Id')\n",
        "print('Submitted pipeline run: ', run_id)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Run Pipeline by Submit call of PipelineEndpoint \n",
        "Run specific pipeline using Submit api of PipelineEndpoint"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# submit pipeline with specific version\n",
        "run_id = pipeline_endpoint_by_name.submit(\"NewName\", pipeline_version=\"0\")\n",
        "print(run_id)\n",
        "\n",
        "# submit pipeline with default version\n",
        "run_id = pipeline_endpoint_by_name.submit(\"NewName\")\n",
        "print(run_id)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Use Experiment.Submit() to Submit Pipeline\n",
        "Run specific pipeline using Experiment submit api"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core import Experiment\n",
        "pipeline_run = Experiment(ws, name=\"submit_endpoint_sample\").submit(pipeline_endpoint_by_name, tags={'endpoint_tag': \"1\"}, pipeline_version=\"0\")"
      ]
    }
  ],
  "metadata": {
    "authors": [
      {
        "name": "sanpil"
      }
    ],
    "category": "tutorial",
    "compute": [
      "AML Compute"
    ],
    "datasets": [
      "Custom"
    ],
    "deployment": [
      "None"
    ],
    "exclude_from_index": false,
    "framework": [
      "Azure ML"
    ],
    "friendly_name": "How to setup a versioned Pipeline Endpoint",
    "kernelspec": {
      "display_name": "Python 3.8 - AzureML",
      "language": "python",
      "name": "python38-azureml"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.6.7"
    },
    "order_index": 12,
    "tags": [
      "None"
    ],
    "task": "Demonstrates the use of PipelineEndpoint to run a specific version of the Published Pipeline"
  },
  "nbformat": 4,
  "nbformat_minor": 2
}